Harden data races between threaded input workers and the main engine#12007
Harden data races between threaded input workers and the main engine#12007erain wants to merge 4 commits into
Conversation
flb_metrics_sum() updates a counter from the owning input or output worker thread while the metrics exporter reads the same counter from the main engine thread. With a threaded input this is a data race reported by ThreadSanitizer; it is benign on the supported hardware but undefined under the C memory model. Add a small flb_atomic.h helper with relaxed load/store/fetch_add and use it for the metric value on both the summing and the reading paths. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Yu Yi <yiyu@google.com>
flb_input_collector_fd() runs on the main engine thread but iterated the collectors of every input, including threaded ones. A threaded input initializes its collector descriptors from its own worker thread, so this races with the main thread reading them at startup. Those collectors are registered and dispatched in the input's own thread and event loop, so they are never matched here. Skipping threaded inputs removes the race and avoids needless iteration. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Yu Yi <yiyu@google.com>
config->grace_input is written once by the engine thread during startup and read concurrently by the supervisor on the main thread, which ThreadSanitizer reports as a data race. Store it with a relaxed atomic; the matching atomic read is done at the supervisor entry point. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Yu Yi <yiyu@google.com>
The supervisor entry point reads config->grace_input, which the engine thread publishes during startup. Read it with a relaxed atomic so the cross-thread access is well defined, matching the engine-side store. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Yu Yi <yiyu@google.com>
📝 WalkthroughWalkthroughAdds relaxed atomic helpers and updates shared scalar reads and writes in engine, metrics, input, and supervisor paths. ChangesShared state atomics
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/fluent-bit/flb_atomic.h`:
- Around line 44-53: The fallback in flb_atomic.h currently maps
flb_atomic_load, flb_atomic_store, and flb_atomic_fetch_add to plain accesses,
which is unsafe for shared cross-thread state. Update the `#else` branch to use a
real atomic implementation for non-GCC/Clang compilers, or fail the build with
an explicit error if no atomic backend is available. Keep the fix scoped to the
flb_atomic_* macros so all existing call sites get proper atomic semantics.
In `@src/flb_engine.c`:
- Around line 1138-1139: The grace window update in flb_engine_started setup is
happening too late, so the main thread can observe a stale grace_input value
after FLB_ENGINE_STARTED is published. Move the flb_atomic_store for
config->grace_input to occur before calling flb_engine_started(config), keeping
the update in the startup path around the existing grace_input logic so
src/fluent-bit.c reads the correct supervisor grace window.
In `@src/fluent-bit.c`:
- Around line 1477-1479: The supervisor grace publication is only using an
atomic load in the fixed path, but the later hot-reload paths in the same
function still read ctx->config->grace_input directly and can reintroduce the
race. Update those additional grace publication sites in the function that
handles hot reloads to use flb_atomic_load on ctx->config->grace_input before
calling flb_supervisor_child_update_grace, keeping the access pattern consistent
everywhere in this flow.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bb82bdf9-9dc4-4eb0-ab1c-9e06c68eaf41
📒 Files selected for processing (5)
include/fluent-bit/flb_atomic.hsrc/flb_engine.csrc/flb_input.csrc/flb_metrics.csrc/fluent-bit.c
| #else | ||
|
|
||
| /* | ||
| * Fallback for compilers without the GCC/Clang atomic builtins. The accesses | ||
| * stay plain; on the hardware these compilers target an aligned word access is | ||
| * atomic, and the races guarded here are benign, so correctness is preserved. | ||
| */ | ||
| #define flb_atomic_load(ptr) (*(ptr)) | ||
| #define flb_atomic_store(ptr, val) (*(ptr) = (val)) | ||
| #define flb_atomic_fetch_add(ptr, v) (*(ptr) += (v)) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -eu
# Verify whether this repository supports non-GCC/non-Clang C compilers and
# whether a platform-specific atomic backend is needed instead of a plain fallback.
{
fd 'CMakeLists\.txt$|.*\.cmake$' .
fd -e h -e c -e cpp .
} | xargs -r rg -n -C2 \
'MSVC|_MSC_VER|CMAKE_C_COMPILER_ID|stdatomic|Interlocked|__atomic|ThreadSanitizer|fsanitize=thread'Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect the atomic helper header, its call sites, and any platform/compiler
# gating around atomic support.
files=$(git ls-files \
'include/fluent-bit/flb_atomic.h' \
'include/**/*.h' \
'src/**/*.c' \
'CMakeLists.txt' \
'cmake/**/*.cmake' \
'lib/**/*.cmake' \
2>/dev/null || true)
printf '%s\n' "$files" | rg -n 'flb_atomic|_MSC_VER|__atomic|stdatomic|Interlocked|atomic' -n -C 2 --no-heading || true
echo '--- HEADER ---'
sed -n '1,140p' include/fluent-bit/flb_atomic.h
echo '--- CALL SITES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib || true
echo '--- BUILD/GATING ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src lib || trueRepository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
echo '--- FILE ---'
sed -n '1,160p' include/fluent-bit/flb_atomic.h
echo '--- USAGES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib
echo '--- WINDOWS/COMPILER SUPPORT ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src libRepository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
echo '--- header ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h
echo '--- direct usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib
echo '--- compiler support / fallbacks ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src libRepository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h
echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib
echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h
echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib
echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -eu
echo 'HEADER:'
sed -n '1,120p' include/fluent-bit/flb_atomic.h
echo
echo 'USAGES:'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src | head -n 200
echo
echo 'BUILD FILES:'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src | head -n 200Repository: fluent/fluent-bit
Length of output: 16002
🏁 Script executed:
#!/bin/bash
set -eu
# Focus only on fluent-bit atomics to determine whether the plain fallback
# is actually used on shared, multi-threaded state and whether any non-GCC/
# non-Clang backend exists for this header.
echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h
echo
echo '--- flb_atomic call sites ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src
echo
echo '--- fluent-bit build/compiler checks ---'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID|MSVC|Interlocked' CMakeLists.txt cmake include src | head -n 200Repository: fluent/fluent-bit
Length of output: 15416
Replace the plain-access fallback with a real atomic backend or a build error. flb_atomic_load/store/fetch_add() are used on shared fields across threads, so the #else branch reintroduces a data race and makes fetch_add a non-atomic RMW on non-GCC/Clang compilers such as MSVC.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@include/fluent-bit/flb_atomic.h` around lines 44 - 53, The fallback in
flb_atomic.h currently maps flb_atomic_load, flb_atomic_store, and
flb_atomic_fetch_add to plain accesses, which is unsafe for shared cross-thread
state. Update the `#else` branch to use a real atomic implementation for
non-GCC/Clang compilers, or fail the build with an explicit error if no atomic
backend is available. Keep the fix scoped to the flb_atomic_* macros so all
existing call sites get proper atomic semantics.
| /* published to the supervisor/main thread, which reads it concurrently */ | ||
| flb_atomic_store(&config->grace_input, config->grace / 2); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect whether flb_engine_started() is the synchronization point that allows
# flb_start() / the main thread to continue.
rg -n -C5 '\bflb_engine_started\s*\(|\bflb_start\s*\(' src includeRepository: fluent/fluent-bit
Length of output: 5072
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.cRepository: fluent/fluent-bit
Length of output: 5961
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.cRepository: fluent/fluent-bit
Length of output: 5961
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.cRepository: fluent/fluent-bit
Length of output: 13207
🏁 Script executed:
#!/bin/bash
set -eu
# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.cRepository: fluent/fluent-bit
Length of output: 13207
Move the grace_input store before the startup signal. flb_start() returns only after the main thread reads FLB_ENGINE_STARTED, so src/fluent-bit.c can load a stale grace_input and publish the wrong supervisor grace window. Store it before flb_engine_started(config).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/flb_engine.c` around lines 1138 - 1139, The grace window update in
flb_engine_started setup is happening too late, so the main thread can observe a
stale grace_input value after FLB_ENGINE_STARTED is published. Move the
flb_atomic_store for config->grace_input to occur before calling
flb_engine_started(config), keeping the update in the startup path around the
existing grace_input logic so src/fluent-bit.c reads the correct supervisor
grace window.
| /* grace_input is published by the engine thread (flb_engine_start) */ | ||
| flb_supervisor_child_update_grace(ctx->config->grace, | ||
| ctx->config->grace_input); | ||
| flb_atomic_load(&ctx->config->grace_input)); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Use atomic loads for every supervisor grace publication.
This path is fixed, but the hot-reload paths later in this function still read ctx->config->grace_input directly at Lines 1518-1531. Those paths can reintroduce the same race when the engine thread publishes the value.
Suggested consistency fix
@@
struct flb_cf_section *section;
struct flb_cf *cf_opts;
struct flb_cf_group *group = NULL;
int supervisor_reload_notified = FLB_FALSE;
+ int grace_input;
@@
if (ctx != NULL && ctx->config != NULL) {
/* grace_input is published by the engine thread (flb_engine_start) */
- flb_supervisor_child_update_grace(ctx->config->grace,
- flb_atomic_load(&ctx->config->grace_input));
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
@@
- flb_supervisor_child_signal_shutdown(ctx->config->grace,
- ctx->config->grace_input);
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
@@
- flb_supervisor_child_update_grace(ctx->config->grace,
- ctx->config->grace_input);
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_update_grace(ctx->config->grace, grace_input);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /* grace_input is published by the engine thread (flb_engine_start) */ | |
| flb_supervisor_child_update_grace(ctx->config->grace, | |
| ctx->config->grace_input); | |
| flb_atomic_load(&ctx->config->grace_input)); | |
| struct flb_cf_section *section; | |
| struct flb_cf *cf_opts; | |
| struct flb_cf_group *group = NULL; | |
| int supervisor_reload_notified = FLB_FALSE; | |
| int grace_input; | |
| ... | |
| /* grace_input is published by the engine thread (flb_engine_start) */ | |
| grace_input = flb_atomic_load(&ctx->config->grace_input); | |
| flb_supervisor_child_update_grace(ctx->config->grace, grace_input); | |
| ... | |
| grace_input = flb_atomic_load(&ctx->config->grace_input); | |
| flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input); | |
| ... | |
| grace_input = flb_atomic_load(&ctx->config->grace_input); | |
| flb_supervisor_child_update_grace(ctx->config->grace, grace_input); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/fluent-bit.c` around lines 1477 - 1479, The supervisor grace publication
is only using an atomic load in the fixed path, but the later hot-reload paths
in the same function still read ctx->config->grace_input directly and can
reintroduce the race. Update those additional grace publication sites in the
function that handles hot reloads to use flb_atomic_load on
ctx->config->grace_input before calling flb_supervisor_child_update_grace,
keeping the access pattern consistent everywhere in this flow.
Summary
Three data races between threaded input worker threads and the main
engine thread, all surfaced by ThreadSanitizer while running a threaded
tailinput (with multiline) alongside the metrics exporter. They areundefined under the C memory model and benign on the supported hardware, but
worth closing:
flb_metrics_sum()updates a counter from the owninginput/output worker thread while the metrics exporter reads the same counter
from the main thread. This one is live whenever metrics are scraped together
with a threaded input.
flb_input_collector_fd()runs on the main thread but iterated athreaded input's collectors, racing the worker thread that initializes those
collector fds at startup. A threaded input's collectors are registered and
dispatched in the input's own thread/event loop and are never matched here,
so the handler now skips threaded inputs (also a small optimization).
config->grace_inputis published by the engine threadduring startup and read by the supervisor on the main thread.
A small
include/fluent-bit/flb_atomic.hhelper is added (relaxed GCC/Clang__atomicbuiltins, plain-access fallback) and used for the metric counter andgrace_input.Testing
Built with ThreadSanitizer (
-fsanitize=thread, jemalloc disabled) and run for~45s against a threaded
tail+multiline.parser cri+nullpipeline drivenby a CRI log generator (~6M lines, 6 files):
flb_metrics_sum,flb_input_collector_fd,grace_input).Example configuration used:
Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Bug Fixes
New Features